Skip to content

feat(nexus): Early Nexus handler support #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

bergundy
Copy link
Member

@bergundy bergundy commented May 8, 2025

  • This PR is blocked on publishing an initial version of the nexus-rpc package.
  • There is a TODO to figure out HandlerError and OperationError message rehydration, pending discussion.
  • Interceptors not yet implemented.
  • WorkflowRunOperation and getClient() not implemented for the @temporalio/nexus package.
  • Tests use the HTTP API directly in lieu of a workflow caller or strongly typed client, we can refactor those later.

@bergundy bergundy requested a review from a team as a code owner May 8, 2025 01:26
@@ -91,6 +91,17 @@ export async function decodeArrayFromPayloads(
return arrayFromPayloads(payloadConverter, await decodeOptional(payloadCodecs, payloads));
}

/**
* Decode `payloads` and then return {@link arrayFromPayloads}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring is wrong

if (nexusServices.has(s.name)) {
throw new TypeError(`Duplicate registration of nexus service ${s.name}`);
}
const ops = new Map<string, nexus.OperationHandler<any, any> | nexus.SyncOperationHandler<any, any>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be part of the Nexus SDK? Logic is trivial at present, but there I expect there will be some normalization work to be done at a later point (e.g. once we add decorators) which we wouldn't to diverge from one Nexus implementation to another. This also seems to be what we did in Java, and kind-of what we did in Go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we discussed moving this to the Nexus SDK with Chad yesterday. I will do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed that part of the conversation.

I will do that.

I only wanted to confirm that we agree on direction. I can do it myself if you don't have time/prefer not to do it (same apply to all suggestions I make on this PR).

import getPort from 'get-port';
import * as nexus from 'nexus-rpc';
import * as protoJsonSerializer from 'proto3-json-serializer';
import * as temporalnexus from '@temporalio/nexus';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependency is missing in package.json and tsconfig.json

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix.

});

Runtime.install({ logger });
t.context.httpPort = await getPort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(JWH): This will be flaky in CI due to port collisions, and users will often need to do that themselves. Move this to Core.

test.beforeEach(async (t) => {
const taskQueue = t.title + randomUUID();
const { env } = t.context;
const response = await env.connection.operatorService.createNexusEndpoint({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is the way for now, but what's the intent regarding this? Do we expect users to have to do the same in their own Temporal+Nexus tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. in Java we have a shortcut, we should probably make it easier in the test environment for Core based SDKs. @dandavison, wondering if you considered this in Python.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I have not considered APIs for user Temporal Nexus testing yet; I'll add a TODO to the code. In my tests I have a helper function like this for creating Nexus endpoints.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an issue to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Created one for Python SDK-3837 "Python Nexus user testing utilities and docs"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have made this a features issue with checkboxes for each SDK but 🤷, thanks!

const input = await decodeFromPayload(this.dataConverter, payload);
if (typeof this.handler === 'function') {
const handler = this.handler as nexus.SyncOperationHandler<unknown, unknown>;
const output = await this.invokeUserCode('startOperation', handler.bind(undefined, input, options));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be preferable to add some defensive checks here to prevent a user from returning a HandlerStartOperationResultSync or HandlerStartOperationResultAsync from their SyncOperationHandler. Sounds like an easy error to make. But I don't think there's currently a safe way of doing this.

options: nexus.StartOperationOptions
): Promise<coresdk.nexus.INexusTaskCompletion> {
try {
const input = await decodeFromPayload(this.dataConverter, payload);
Copy link
Contributor

@dandavison dandavison May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data converter fails here, I assume it should result in a 400 BAD_REQUEST Nexus HTTP response, right? If that's correct can you confirm that it does?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a test worth adding. Thanks!

bergundy added 8 commits May 14, 2025 20:02
- This PR is blocked on publishing an initial version of the `nexus-rpc` package.
- There is a TODO to figure out HandlerError and OperationError message rehydration, pending discussion.
- Interceptors not yet implemented.
- `WorkflowRunOperation` and `getClient()` not implemented for the `@temporalio/nexus` package.
- Tests use the HTTP API directly in lieu of a workflow caller or strongly typed client, we can refactor those later.
import {
convertWorkflowEventLinkToNexusLink,
convertNexusLinkToWorkflowEventLink,
} from '@temporalio/worker/lib/nexus/link-converter';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} from '@temporalio/worker/lib/nexus/link-converter';
} from '@temporalio/nexus/lib/link-converter';

) => Promise<WorkflowHandle<O>>;

export class WorkflowRunOperation<I, O> implements nexus.OperationHandler<I, O> {
constructor(readonly handler: WorkflowRunOperationHandler<I, O>) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that WorkflowRunOperation implements OperationHandler, isn't there an argument that WorkflowRunOperation should be called WorkflowRunOperationHandler and that the function passed into the constructor (type currently called WorkflowRunOperationHandler) should be called startHandler: WorkflowRunOperationStartHandler?

}
if (!token.wid) {
throw new TypeError('invalid workflow run token: missing workflow ID (wid)');
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that Go and TS don't validate the 'ns' field. Is that deliberate?

// Currently, we only have one type of operation token: WorkflowRun.
type OperationTokenType = typeof OPERATION_TOKEN_TYPE_WORKFLOW_RUN;

interface WorkflowRunOperationToken {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you still be happy with this name when we have other operations such as WorkflowSignalWithStartOperation and WorkflowUpdateWithStartOperation that start workflows? Or is it better to give it a slightly more general name that indicates that it represents a workflow but is agnostic regarding how that workflow was started?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, OPERATION_TOKEN_TYPE_WORKFLOW might be a better name ("RUN" does not mean a workflow run here; it's just a wid hence extends over CAN)

@mjameswh mjameswh changed the title Nexus handler near complete implementation feat(nexus): Add early Nexus handler support Jun 4, 2025
@mjameswh mjameswh changed the title feat(nexus): Add early Nexus handler support feat(nexus): Early Nexus handler support Jun 4, 2025
workflowId,
// To test attaching multiple callers to the same operation.
workflowIdConflictPolicy: 'USE_EXISTING',
});
Copy link
Contributor

@dandavison dandavison Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design proposed here is that, when a user wants to start a workflow that is the backing execution for their Nexus operation, they must call a special temporalnexus.startWorkflow instead of the usual client.startWorkflow. The "special" version takes care of

  • propagating nexus request ID to StartWorkflow request ID
  • establishing bidirectional linkage

I'm going to note some downsides with this design; no implication intended that other designs are free of downsides.

  1. The user may not know to do this, and obtain a normal Temporal client and start their workflow in the usual way via client.startWorkflow.
  2. The user may write a Nexus operation that starts multiple workflows, intending only the final one to be the "backing" workflow. But if they use temporalnexus.startWorkflow for more than one workflow, they will get unexpected behavior.

Copy link
Contributor

@dandavison dandavison Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm partly mentioning this because I had been using a similar design for Python (in Python I was hoping not to introduce a second start_workflow). But there are similar challenges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far we've been saying that it's better for the user to be explicit about the workflow that they intend to use to back the operation. Sure, there are cases where users may try to call this function twice in a handler and we can decide whether it's undefined behavior or a runtime error.

I don't think implicitly doing all of the things the "special" function does in client.startWorkflow is the right approach. And note that the context here is passed explicitly as we've discussed in the long explicit vs. implicit context discussion, let's not go back on that.

I like this version the best as it is consistent with Go, and allows a user to intercept failures.

FTR, this helper also propagates the callbacks and populates onConflictOptions when USE_EXISTING is used to attach a callback to an already running workflow.

Copy link
Contributor

@dandavison dandavison Jun 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we don't want to do the "special" things implicitly in client.startWorkflow.

I think what we want to do is almost exactly what you're doing, but instead of returning WorkflowHandle<OutputT>, return a type like WorkflowOperationToken<OutputT>.

For Python's solution to this, see https://github.com/temporalio/sdk-python/pull/813/files#r2160553793.

With what you've got currently, the user could easily make a mistake and use the normal client.startWorkflow, thinking that they are creating a nexusified workflow. And that would be very bad: there would be no type-check-time error and no request-time error; they will not be alerted to their mistake until they see that their operation result is never delivered and there is no bidirectional linking.

But having temporalnexus.startWorkflow return WorkflowOperationToken<OutputT> solves that: they still have the familiar startWorkflow interface, but it's impossible for the user to make the mistake in Typescript due to the type error.

These are the requirements that I believe we should be designing for here:

  1. A Nexus operation handler must be able to start a nexusified workflow:
    a. delivers the final Nexus operation result
    b. has bidirectional linking
    c. Nexus request ID propagated into the StartWorkflow request
  2. A Nexus operation handler must also be able to start “normal” workflows.
  3. It should not be possible for a user who is trying to do (1) to get confused and accidentally do (2) instead.
  4. The above must all hold whether a user is using the “shorthand” form (WorkflowRunOperation) or manually implementing an operation handler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants